package day02.transform;

import beans.SensorReading;
import enums.TemperatureType;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Collections;

/**
 * Flink 流处理 API -  分流：Split 和 Select
 *
 * @author lvbingbing
 * @date 2021-12-13 22:10
 */
public class FlinkTransform04 {
    public static void main(String[] args) throws Exception {
        // 1、创建FlinkTransform00对象，有参构造会初始化 env，并从文件中读取数据
        int parallelism = 1;
        FlinkTransform00 flinkTransform = new FlinkTransform00(parallelism);
        // 2、获取执行环境
        StreamExecutionEnvironment env = flinkTransform.getEnv();
        // 3、学习 split 和 select
        studySplitSelect(flinkTransform.getSensorReadingStream());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 学习 split 和 select
     * <p>
     * split()：DataStream → SplitStream：根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream。
     * <p>
     * select()：SplitStream→ DataStream：从一个 SplitStream 中获取一个或者多个 DataStream。
     *
     * @param sensorReadingStream <br>
     */
    private static void studySplitSelect(DataStream<SensorReading> sensorReadingStream) {
        // 1、分流，按照温度值30度为界分为两条流
        SplitStream<SensorReading> splitStream = sensorReadingStream.split(value -> value.getTemperature().compareTo(30.0) > 0 ?
                Collections.singletonList(TemperatureType.HIGH.type) : Collections.singletonList(TemperatureType.LOW.type));
        // 2、从splitStream中获取指定的DataStream
        DataStream<SensorReading> highStream = splitStream.select(TemperatureType.HIGH.type);
        highStream.print("high");
        DataStream<SensorReading> lowStream = splitStream.select(TemperatureType.LOW.type);
        lowStream.print("low");
        DataStream<SensorReading> allStream = splitStream.select(TemperatureType.HIGH.type, TemperatureType.LOW.type);
        allStream.print("all");
    }
}
